---
title: Async Memory
description: 'Asynchronous memory for Mem0'
icon: "bolt"
iconType: "solid"
---

## AsyncMemory

The `AsyncMemory` class is a direct asynchronous interface to Mem0's in-process memory operations. Unlike the memory, which interacts with an API, `AsyncMemory` works directly with the underlying storage systems. This makes it ideal for applications where you want to embed Mem0 directly into your codebase.

### Initialization

To use `AsyncMemory`, import it from the `mem0.memory` module:

```python Python
import asyncio
from mem0 import AsyncMemory

# Initialize with default configuration
memory = AsyncMemory()

# Or initialize with custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
    # Your custom configuration here
)
memory = AsyncMemory(config=custom_config)
```

### Key Features

1. **Non-blocking Operations** - All memory operations use `asyncio` to avoid blocking the event loop
2. **Concurrent Processing** - Parallel execution of vector store and graph operations
3. **Efficient Resource Utilization** - Better handling of I/O bound operations
4. **Compatible with Async Frameworks** - Seamless integration with FastAPI, aiohttp, and other async frameworks

### Methods

All methods in `AsyncMemory` have the same parameters as the synchronous `Memory` class but are designed to be used with `async/await`.

#### Create memories

Add a new memory asynchronously:

```python Python
try:
    result = await memory.add(
        messages=[
            {"role": "user", "content": "I'm travelling to SF"},
            {"role": "assistant", "content": "That's great to hear!"}
        ],
        user_id="alice"
    )
    print("Memory added successfully:", result)
except Exception as e:
    print(f"Error adding memory: {e}")
```

#### Retrieve memories

Retrieve memories related to a query:

```python Python
try:
    results = await memory.search(
        query="Where am I travelling?",
        user_id="alice"
    )
    print("Found memories:", results)
except Exception as e:
    print(f"Error searching memories: {e}")
```

#### List memories

List all memories for a `user_id`, `agent_id`, and/or `run_id`:

```python Python
try:
    all_memories = await memory.get_all(user_id="alice")
    print(f"Retrieved {len(all_memories)} memories")
except Exception as e:
    print(f"Error retrieving memories: {e}")
```

#### Get specific memory

Retrieve a specific memory by its ID:

```python Python
try:
    specific_memory = await memory.get(memory_id="memory-id-here")
    print("Retrieved memory:", specific_memory)
except Exception as e:
    print(f"Error retrieving memory: {e}")
```

#### Update memory

Update an existing memory by ID:

```python Python
try:
    updated_memory = await memory.update(
        memory_id="memory-id-here",
        data="I'm travelling to Seattle"
    )
    print("Memory updated successfully:", updated_memory)
except Exception as e:
    print(f"Error updating memory: {e}")
```

#### Delete memory

Delete a specific memory by ID:

```python Python
try:
    result = await memory.delete(memory_id="memory-id-here")
    print("Memory deleted successfully")
except Exception as e:
    print(f"Error deleting memory: {e}")
```

#### Delete all memories

Delete all memories for a specific user, agent, or run:

```python Python
try:
    result = await memory.delete_all(user_id="alice")
    print("All memories deleted successfully")
except Exception as e:
    print(f"Error deleting memories: {e}")
```

<Note>
At least one filter (user_id, agent_id, or run_id) is required when using delete_all.
</Note>

### Advanced Memory Organization

AsyncMemory supports the same three-parameter organization system as the synchronous Memory class:

```python Python
# Store memories with full context
await memory.add(
    messages=[{"role": "user", "content": "I prefer vegetarian food"}],
    user_id="alice",
    agent_id="diet-assistant",
    run_id="consultation-001"
)

# Retrieve memories with different scopes
all_user_memories = await memory.get_all(user_id="alice")
agent_memories = await memory.get_all(user_id="alice", agent_id="diet-assistant")
session_memories = await memory.get_all(user_id="alice", run_id="consultation-001")
specific_memories = await memory.get_all(
    user_id="alice", 
    agent_id="diet-assistant", 
    run_id="consultation-001"
)

# Search with context
general_search = await memory.search("What do you know about me?", user_id="alice")
agent_search = await memory.search("What do you know about me?", user_id="alice", agent_id="diet-assistant")
session_search = await memory.search("What do you know about me?", user_id="alice", run_id="consultation-001")
```

#### Memory History

Get the history of changes for a specific memory:

```python Python
try:
    history = await memory.history(memory_id="memory-id-here")
    print("Memory history:", history)
except Exception as e:
    print(f"Error retrieving history: {e}")
```

### Example: Concurrent Usage with Other APIs

`AsyncMemory` can be effectively combined with other async operations. Here's an example showing how to use it alongside OpenAI API calls in separate threads:

```python Python
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory

async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()

async def chat_with_memories(message: str, user_id: str = "default_user") -> str:
    try:
        # Retrieve relevant memories
        search_result = await async_memory.search(query=message, user_id=user_id, limit=3)
        relevant_memories = search_result["results"]
        memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories)
        
        # Generate Assistant response
        system_prompt = f"You are a helpful AI. Answer the question based on query and memories.\nUser Memories:\n{memories_str}"
        messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": message}]
        response = await async_openai_client.chat.completions.create(model="gpt-4o-mini", messages=messages)
        assistant_response = response.choices[0].message.content

        # Create new memories from the conversation
        messages.append({"role": "assistant", "content": assistant_response})
        await async_memory.add(messages, user_id=user_id)

        return assistant_response
    except Exception as e:
        print(f"Error in chat_with_memories: {e}")
        return "I apologize, but I encountered an error processing your request."

async def async_main():
    print("Chat with AI (type 'exit' to quit)")
    while True:
        user_input = input("You: ").strip()
        if user_input.lower() == 'exit':
            print("Goodbye!")
            break
        response = await chat_with_memories(user_input)
        print(f"AI: {response}")

def main():
    asyncio.run(async_main())

if __name__ == "__main__":
    main()
```

## Error Handling and Best Practices

### Common Error Types

When working with `AsyncMemory`, you may encounter these common errors:

#### Connection and Configuration Errors

```python Python
import asyncio
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig

async def handle_initialization_errors():
    try:
        # Initialize with custom config
        config = MemoryConfig(
            vector_store={"provider": "chroma", "config": {"path": "./chroma_db"}},
            llm={"provider": "openai", "config": {"model": "gpt-4o-mini"}}
        )
        memory = AsyncMemory(config=config)
        print("AsyncMemory initialized successfully")
    except ValueError as e:
        print(f"Configuration error: {e}")
    except ConnectionError as e:
        print(f"Connection error: {e}")
    except Exception as e:
        print(f"Unexpected initialization error: {e}")

asyncio.run(handle_initialization_errors())
```

#### Memory Operation Errors

```python Python
async def handle_memory_operation_errors():
    memory = AsyncMemory()
    
    try:
        # Memory not found error
        result = await memory.get(memory_id="non-existent-id")
    except ValueError as e:
        print(f"Invalid memory ID: {e}")
    except Exception as e:
        print(f"Memory retrieval error: {e}")
    
    try:
        # Invalid search parameters
        results = await memory.search(query="", user_id="alice")
    except ValueError as e:
        print(f"Invalid search query: {e}")
    except Exception as e:
        print(f"Search error: {e}")
```

### Performance Optimization

#### Concurrent Operations

Take advantage of AsyncMemory's concurrent capabilities:

```python Python
async def batch_operations():
    memory = AsyncMemory()
    
    # Process multiple operations concurrently
    tasks = []
    for i in range(5):
        task = memory.add(
            messages=[{"role": "user", "content": f"Message {i}"}],
            user_id=f"user_{i}"
        )
        tasks.append(task)
    
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Task {i} failed: {result}")
            else:
                print(f"Task {i} completed successfully")
    except Exception as e:
        print(f"Batch operation error: {e}")
```

#### Resource Management

Properly manage AsyncMemory lifecycle:

```python Python
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_memory():
    memory = AsyncMemory()
    try:
        yield memory
    finally:
        # Clean up resources if needed
        pass

async def safe_memory_usage():
    async with get_memory() as memory:
        try:
            result = await memory.search("test query", user_id="alice")
            return result
        except Exception as e:
            print(f"Memory operation failed: {e}")
            return None
```

### Timeout and Retry Strategies

Implement timeout and retry logic for robustness:

```python Python
async def with_timeout_and_retry(operation, max_retries=3, timeout=10.0):
    for attempt in range(max_retries):
        try:
            result = await asyncio.wait_for(operation(), timeout=timeout)
            return result
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
        except Exception as e:
            print(f"Error on attempt {attempt + 1}: {e}")
        
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    raise Exception(f"Operation failed after {max_retries} attempts")

# Usage example
async def robust_memory_search():
    memory = AsyncMemory()
    
    async def search_operation():
        return await memory.search("test query", user_id="alice")
    
    try:
        result = await with_timeout_and_retry(search_operation)
        print("Search successful:", result)
    except Exception as e:
        print(f"Search failed permanently: {e}")
```

### Integration with Async Frameworks

#### FastAPI Integration

```python Python
from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory
import asyncio

app = FastAPI()
memory = AsyncMemory()

@app.post("/memories/")
async def add_memory(messages: list, user_id: str):
    try:
        result = await memory.add(messages=messages, user_id=user_id)
        return {"status": "success", "data": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/memories/search")
async def search_memories(query: str, user_id: str, limit: int = 10):
    try:
        result = await memory.search(query=query, user_id=user_id, limit=limit)
        return {"status": "success", "data": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
```

### Troubleshooting Guide

| Issue | Possible Causes | Solutions |
|-------|----------------|-----------|
| **Initialization fails** | Missing dependencies, invalid config | Check dependencies, validate configuration |
| **Slow operations** | Large datasets, network latency | Implement caching, optimize queries |
| **Memory not found** | Invalid memory ID, deleted memory | Validate IDs, implement existence checks |
| **Connection timeouts** | Network issues, server overload | Implement retry logic, check network |
| **Out of memory errors** | Large batch operations | Process in smaller batches |

### Monitoring and Logging

Add comprehensive logging to your async memory operations:

```python Python
import logging
import time
from functools import wraps

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_async_operation(operation_name):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            logger.info(f"Starting {operation_name}")
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"{operation_name} completed in {duration:.2f}s")
                return result
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"{operation_name} failed after {duration:.2f}s: {e}")
                raise
        return wrapper
    return decorator

@log_async_operation("Memory Add")
async def logged_memory_add(memory, messages, user_id):
    return await memory.add(messages=messages, user_id=user_id)
```

If you have any questions or need further assistance, please don't hesitate to reach out:

<Snippet file="get-help.mdx" />
